使用streamlit建立一個可以輸入EdgeQL
、query_args
及query_kwargs
的form
,並於submit
之後,傳送query
至EdgeDB Cloud
執行(註1
)。
ECC
├── ...
├── st_comps.py
├── st_data_structures.py
├── st_utils.py
└── streamlit_app.py
為擺放各種streamlit的widget及element的檔案。
只有一個FormContent
的NamedTuple
,於收集form
內容時使用。
內有著許多小工具,我們挑選幾個比較重要的function
來說明。
get_loop_dict
與get_conn_dict
皆使用@st.cache_resource
裝飾。這麼一來,當任何session
一進來,都可以取得同一個loop_dict
與conn_dict
,可以幫助我們使用現在的timestamp
與存在其中的timestamp
進行對比,進而執行想要的操作。
@st.cache_resource
def get_loop_dict() -> dict[str, Any]:
return {}
@st.cache_resource
def get_conn_dict() -> dict[str, Any]:
return {}
_routine_clean
會取得現在的timestamp
,以此計算get_loop_dict
與get_conn_dict
中是否有超過threshold
的loop
或conn
。相當於每次呼叫streamlit_app.py
時,會定時清除閒置過久的資源。
def _routine_clean(excluded_tokens: list[str],
threshold: float = 300) -> None:
cur_ts = get_cur_ts()
ld = get_loop_dict()
to_del_loop_tokens = {t
for t, (_, loop_ts) in ld.items()
if cur_ts - loop_ts > threshold}
for _token in excluded_tokens:
to_del_loop_tokens.discard(_token)
for k in to_del_loop_tokens:
try:
del ld[k]
except Exception as ex:
st.toast(f'{ex=} happened in del loops', icon="🚨")
cd = get_conn_dict()
to_del_conn_tokens = {t
for t, (_, conn_ts) in cd.items()
if cur_ts - conn_ts > threshold}
for _token in excluded_tokens:
to_del_conn_tokens.discard(_token)
for k in to_del_conn_tokens:
try:
del cd[k]
except Exception as ex:
st.toast(f'{ex=} happened in del conns', icon="🚨")
_populate_qry_args
將接收到的str
以;
分隔,接著檢查各個arg_str
是否為支援的型態。如果是的話,嘗試使用eval(arg_str)
取得轉換後的型態,再append
到qry_args
,最後返回return tuple(qry_args)
。
def _populate_qry_args(qry_args_str: str) -> tuple[Any, ...]:
qry_args: list[Any] = []
for arg_str in qry_args_str.split(';'):
if arg_str.strip() and \
isinstance(arg_str, (str, datetime.date, datetime.datetime)):
try:
eval_arg = eval(arg_str)
except SyntaxError as e:
st.warning(
'Can not parse the positional query arguments!')
raise e
else:
qry_args.append(eval_arg)
return tuple(qry_args)
_populate_qry_kwargs
將接收到的str
以;
分隔,接著嘗試使用exec(kwarg_str.strip(), globals(), qry_kwargs)
,將kwarg_str
populate到qry_kwargs
中,並於最後返回qry_kwargs
。
def _populate_qry_kwargs(qry_kwargs_str: str) -> dict[str, Any]:
qry_kwargs: dict[str, Any] = {}
for kwarg_str in qry_kwargs_str.split(';'):
try:
exec(kwarg_str.strip(), globals(), qry_kwargs)
except SyntaxError as e:
st.warning(
'Can not parse the named query arguments!')
raise e
return qry_kwargs
_convert_form_to_record
將form
接收的內容轉換為QueryRecord
格式。
我們曾經想在_receive_required_single
中的st.radio
使用Enum
,但streamlit常會報錯,所以只好接收str
型態再使用convert_str_to_required_single
轉為Enum
。
def _convert_form_to_record(form: FormContent) -> QueryRecord:
qry = form.qry
extra_args = _populate_qry_args(form.qry_args_str)
jsonify = convert_bool_to_jsonify(form.jsonify)
required_single = convert_str_to_required_single(form.required_single)
extra_kwargs = _populate_qry_kwargs(form.qry_kwargs_str)
task_name = uuid.uuid4().hex[:6]
return QueryRecord(qry,
extra_args,
jsonify,
required_single,
extra_kwargs,
task_name)
_create_task_from_form
使用傳入的tg
來新增task
。
async def _create_task_from_form(tg: asyncio.TaskGroup,
conn: EdgeDBCloudConn,
form: FormContent,
tasks: set[asyncio.Task[Any]]) -> None:
record = _convert_form_to_record(form)
async with conn:
task = tg.create_task(conn.query(record.qry,
*record.extra_args,
jsonify=record.jsonify,
required_single=record.required_single,
**record.extra_kwargs),
name=record.task_name)
tasks.add(task)
loop
我們希望多個session
能同時獨立操作,所以需要針對每個session
建立loop
及conn
,無法簡單的呼叫asyncio.run
就好。由於loop
及conn
都需要在每個session
一開始就確定下來,所以如果將_prepare_loop
或_prepare_conn
移至其它檔案會出現問題。
...
import nest_asyncio
nest_asyncio.apply()
if 'token' not in st.session_state:
token = generate_token()
logging.info(f'Generating token: {token}')
st.session_state['token'] = token
if __name__ == '__main__':
cur_ts = get_cur_ts()
token = st.session_state.token
excluded_tokens = [token]
loop = _prepare_loop(cur_ts, token)
conn = _prepare_conn(cur_ts, token)
_display_res(token, loop, conn, excluded_tokens)
_routine_clean(excluded_tokens)
asyncio.set_event_loop(loop)
loop.run_until_complete(run(main, conn, token))
session
一開始時,呼叫generate_token
產生獨特的token
,並儲存於st.session_state
中。_prepare_loop
準備loop
。_prepare_conn
準備conn
。_display_res
於最上方顯示resource
,並生成refresh
與try free resource
兩個button。_routine_clean
定時清理loop
與conn
。asyncio.set_event_loop
設定loop
。loop.run_until_complete
執行run
,啟動event loop。_prepare_loop
get_loop_dict
取得loop_dict
。token
是否在loop_dict
中。如果不在的話,呼叫asyncio.new_event_loop
建立一個新loop
;如果在的話,從中取出loop
。loop_dict[token] = (loop, cur_ts)
更新timestamp
。loop
。def _prepare_loop(cur_ts: int, token: str) -> asyncio.AbstractEventLoop:
loop_dict = get_loop_dict()
if token not in loop_dict:
loop = asyncio.new_event_loop()
else:
loop, _ = loop_dict[token]
loop_dict[token] = (loop, cur_ts)
return loop
_prepare_conn
get_conn_dict
取得conn_dict
。token
是否在conn_dict
中。如果不在的話,呼叫asyncio.new_event_loop
建立一個新conn
;如果在的話,從中取出conn
。conn_dict[token] = (conn, cur_ts)
更新timestamp
。conn
。def _prepare_conn(cur_ts: int, token: str) -> EdgeDBCloudConn:
conn_dict = get_conn_dict()
if token not in conn_dict:
conn = EdgeDBCloudConn(**load_st_toml())
else:
conn, _ = conn_dict[token]
conn_dict[token] = (conn, cur_ts)
return conn
run
為asyncio
所執行的coroutine
,其內為一個try-except*-else
結構。
try
中,使用asyncio.TaskGroup
將app
整體布局的algo
(即main
function
)加入到task
。請注意,這邊我們用到[Day26]新學到的技巧,來將tg
往下傳給algo
。這麼一來,除了當前這個task
外,algo
內也可以新增其它的task
。except* Exception as ex
中,經過render
後,印出各Exception
的錯誤資訊。else
中,呈現query
的結果。...
tasks = set()
async def run(algo, conn: EdgeDBCloudConn, token: str) -> None:
top_name = 'top'
try:
async with asyncio.TaskGroup() as tg:
task = tg.create_task(algo(tg, conn, token), name=top_name)
tasks.add(task)
except* Exception as ex:
for exc in ex.exceptions:
st.warning(f'Exception: {type(exc).__name__}')
_render_exception(exc)
else:
for task in tasks:
if (task_name := task.get_name()) != top_name:
st.write(f'task_name: {task_name}')
_render_result(task.result())
main
為整個app的layout
。
_display_sidebar
,建立sidebar
element
。_get_query_form
,建立form
element
。form
被submit
時,呼叫_create_tasks_for_form
搜集form
中各項資料並整理後,建立query
task
。_display_big_red_btn_and_db_calls
,建立一個清除conn
的小工具。...
async def main(tg: asyncio.TaskGroup, conn: EdgeDBCloudConn, token: str) -> None:
_display_sidebar()
form = _get_query_form()
if form.submitted:
await _create_tasks_for_form(tg, conn, form)
_display_big_red_btn_and_db_calls(conn, token)
streamlit cloud
可以部署無限制的public app
及一個private app
。
您可以使用private app
或是使用public app
加上authenticator來部署。
部署的過程很簡單,只需完成下列各個選項。所有的credentials
可以置於Advanced settings
中,並於app內使用st.secrets存取。這相當於local
開發時的.streamlit/secrets.toml
。
這個project還有非常多可以改進的地方,例如:
get_loop_dict
及get_conn_dict
這樣的方法,在連線數較少時可以使用,但是當連線數較多時,記憶體使用量也會增加不少。或許我們可以將其轉為其它格式,例如pickle
,然後使用另一個背景程式來定時改動及讀取pickle
。Try Free Res
是可以清除掉所有threshold
大於3
的loops
及conns
。_populate_qry_args
與_populate_qry_kwargs
需要使用比eval
與exec
更安全的處理方式。mocking
引入tests
。type annotation
還有很大的進步空間。app
是設計以asyncio
長期等待task
,所以db connection不需關閉。但當遇到shutdown時,如何有效關閉所有的connections,還需要好好想想。儘管如此,還是學習到了很多,例如:
asyncio.TaskGroup
與ExceptionGroup
來處理asyncio
問題。streamlit_app.py
。註1:EdgeDB Cloud
已有很好的UI操控介面可以使用,本日內容純屬自我練習之用。
註2:需使用nest_asyncio.apply
來防止更新loop
或conn
時,容易出現的RuntimeError: Task <Task pending name='Task-xxx' ...> attached to a different loop
。